-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: Use PartitionValueExtractor interface in Spark reader path #17850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: Use PartitionValueExtractor interface in Spark reader path #17850
Conversation
eb515ef to
d5a979e
Compare
Wouldn't this break existing users who might have their own implementations for PartitionValueExtractor? We should avoid doing this. Instead, can we introduce a new interface named SparkPartitionValueExtrator in some spark other package. If need be, this can extend from existing PartitionValueExtractor as well. |
| Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath)) | ||
| } else if(usePartitionValueExtractorOnRead && !StringUtils.isNullOrEmpty(partitionValueExtractorClass)) { | ||
| try { | ||
| val partitionValueExtractor = Class.forName(partitionValueExtractorClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this to a private method to keep this method lean
may be, parsePartitionValuesBasedOnPartitionValueExtrator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done.
| .withDocumentation("Key Generator type to determine key generator class"); | ||
|
|
||
| public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty | ||
| .key("hoodie.datasource.hive_sync.partition_extractor_class") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none of the table property will have "hoodie.datasource. as prefixes.
we should define two configs.
hoodie.datasource.hive_sync.partition_extractor_class for writer property.
and
hoodie.table.hive_sync.partition_extractor_class for table config.
users should not be able to directly set the table property. they should always set the writer property only i.e. hoodie.datasource.hive_sync.partition_extractor_class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we are storing configs with prefix as hoodie.datasource like follows, maybe we need to change for them as well.
hoodie.datasource.write.drop.partition.columns
hoodie.datasource.write.hive_style_partitioning
Anyways, I have created two different properties, one will be in HoodieTableConfig and other will be in HoodieSynnConfig. Also added validation to make sure people dont give different values for this.
| .or(() -> Option.ofNullable(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))); | ||
|
|
||
| if (!partitionFieldsOpt.isPresent()) { | ||
| return Option.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this not NonPartitionedExtractor ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, made the change.
| */ | ||
|
|
||
| package org.apache.hudi.sync.common.model; | ||
| package org.apache.hudi.hive.sync; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets not change this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my bad I am using the same pacakge now, but I need to move the interface to hudi-common as there is a compile time dependency now.
|
|
||
| val USE_PARTITION_VALUE_EXTRACTOR_ON_READ: ConfigProperty[String] = ConfigProperty | ||
| .key("hoodie.datasource.read.partition.value.using.partion-value-extractor-class") | ||
| .defaultValue("true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we disable by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have an infer function, this might get exercised out of the box. lets keep OOB behavior untouched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense.
|
|
||
| import java.util | ||
|
|
||
| class TestCustomSlashPartitionValueExtractor extends PartitionValueExtractor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the changes.
| .markAdvanced() | ||
| .withDocumentation("Field in the table to use for determining hive partition columns."); | ||
|
|
||
| public static final ConfigProperty<String> META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets leave this as is w/o any changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, reverting this change.
| Seq(7, "a7", 7000, "2024-01-03", "CAN", "ON", "TOR") | ||
| ) | ||
|
|
||
| // Test partition pruning with combined date and state filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
proper way to assert here is. we should corrupt one of the parquet files in one another partition which does not match the predicate. and then, the query will only succeed if the partition pruning really worked. if not, query will hit FileNotFoundException.
But we can't afford to do it for every query. since one of the data file will be corrupted. So, may be we can do it for 1 or 2 of the partition pruning queries you have here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the test case with corrupted parquet file.
| public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty | ||
| .key("hoodie.datasource.hive_sync.partition_extractor_class") | ||
| .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") | ||
| .withInferFunction(cfg -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not be setting any default here right?
ok to have the infer function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't use a config key w/ "hive_sync" in the name.
We plan to use this for reading.
May be hoodie.table.partition_value_extractor_class.
but we might have to introduce new partition value extractor classes for the read instead of using the same one we use for hive sync.
let me think about it more or chat w/ others to see how we can go about this
|
|
||
| metadataTable.close() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand hive style partitioning and the custom partition value extractor are mutually exclusive.
but can we add a test w/ custom partition value extractor and url encoding enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
d1d62f3 to
908fd75
Compare
Create unit test Fix the test custom partition value extractor interface
febf1da to
de1f911
Compare
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @yihua : can you take a sneak peak into this PR.
lets chat about how we can go about it.
| public static final ConfigProperty<String> PARTITION_VALUE_EXTRACTOR_CLASS = ConfigProperty | ||
| .key("hoodie.datasource.hive_sync.partition_extractor_class") | ||
| .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") | ||
| .withInferFunction(cfg -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't use a config key w/ "hive_sync" in the name.
We plan to use this for reading.
May be hoodie.table.partition_value_extractor_class.
but we might have to introduce new partition value extractor classes for the read instead of using the same one we use for hive sync.
let me think about it more or chat w/ others to see how we can go about this
| } | ||
|
|
||
| // Validate partition value extractor | ||
| val currentPartitionValueExtractor = params.getOrElse(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make a new writer property.
lets align before we go ahead w/ more changes
|
|
||
| if (!partitionFieldsOpt.isPresent()) { | ||
| return Option.empty(); | ||
| return Option.of("org.apache.hudi.hive.NonPartitionedExtractor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this into a separate PR?
looks like a bug fix right.
| && cfg.getString(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key()).equals("true")) { | ||
| return Option.of("org.apache.hudi.hive.HiveStylePartitionValueExtractor"); | ||
| } else if (cfg.contains(SLASH_SEPARATED_DATE_PARTITIONING) | ||
| && cfg.getString(SLASH_SEPARATED_DATE_PARTITIONING).equals("true")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why changing these?
| .markAdvanced() | ||
| .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " | ||
| + "default 'org.apache.hudi.hive.MultiPartKeysValueExtractor'."); | ||
| + "default is inferred based on partition configuration."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets chat about how does the new config interplays w/ existing hive sync config
Describe the issue this Pull Request addresses
This PR enables the use of custom
PartitionValueExtractorimplementations when reading Hudi tables in Spark, allowing users to define custom logic for extracting partition values from partition paths.Previously, the
PartitionValueExtractorinterface was only used during write/sync operations but not during read operations.Summary and Changelog
Users can now configure custom partition value extractors for read operations using the
hoodie.datasource.read.partition.value.extractor.classoption, enabling support for non-standard partition path formats.Changes:
PartitionValueExtractorinterface fromhudi-sync-commontohudi-commonfor broader accessibilityPARTITION_VALUE_EXTRACTOR_CLASSconfig toHoodieTableConfigandDataSourceOptionsHoodieSparkUtils.createPartitionSchema()to usePartitionValueExtractorfor partition value extractionHoodieFileIndexand related classes to support custom partition value extractorsTestCustomSlashPartitionValueExtractordemonstrating date formatting from slash-separated paths (yyyy/mm/dd → yyyy-mm-dd)PartitionValueExtractorimplementations to use the relocated interfaceImpact
Public API Changes:
hoodie.datasource.read.partition.value.extractor.classfor Spark readsPartitionValueExtractorinterface relocated fromorg.apache.hudi.sync.common.modeltoorg.apache.hudi.hive.syncUser-Facing Changes:
Users can now customize partition value extraction during read operations by providing a custom
PartitionValueExtractorimplementationRisk Level
Low - This is an additive change that maintains backward compatibility. When no custom extractor is specified, the default behavior remains unchanged (standard slash-based splitting). The change has been tested with custom partition value extractor implementations.
Documentation Update
Documentation should be updated to include:
hoodie.datasource.read.partition.value.extractor.classin the configuration reference (WIP)PartitionValueExtractorimplementations for read operations (WIP)Contributor's checklist